package com.deepuser.idmapping.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.deepuser.mq.kafka.utils.KafkaUtil;
import com.deepuser.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;

/**
 * kafka 消费类
 * @author gourd
 */
@Slf4j
public class KafkaConsumerService {

    @Value("#{${deepuser.idmapping}}")
    public LinkedHashMap<String,HashMap> hashMap;



    @KafkaListener(topics = "${spring.kafka.consumer.gourd-topic1}", groupId = "${spring.kafka.consumer.group-id}" ,containerFactory = "kafkaListenerContainerFactory")
    public void kafkaConsumerIdMapping(ConsumerRecord<String, String> record) {
        log.info("消消费消息 topic = {} , content = {}",record.topic(),record.value());
        String messageText = record.value();
        // 业务代码......
        log.info("消费到的信息==="+messageText);
        JSONObject result = null;
        JSONObject returnResult = null;
        try {
            result = JSON.parseObject(messageText);
            returnResult = JSON.parseObject(messageText);

        }catch (Exception e){
            log.error("kafka数据消费异常！！！");
        }
        if (null != result){
            //IdMapping
            if(result.keySet().retainAll(hashMap.keySet())){
                //遍历配置文件
                Iterator<Map.Entry<String,HashMap>> it =  hashMap.entrySet().iterator();
                LinkedHashMap<String,Integer> idMap = new LinkedHashMap<>();
                while(it.hasNext()){
                    Map.Entry<String,HashMap> entry=it.next();
                    if(result.containsKey(entry.getKey())){
                        //IdMap中存放result中存在的id值，用"："处理过
                        idMap.put(entry.getKey()+":"+result.get(entry.getKey()).toString(),Integer.parseInt(entry.getValue().get("level").toString()));
                    }
                }
                HashMap<String,Integer> mappingId = new HashMap<>();
                try{
                    AtomicInteger i = new AtomicInteger();
                    AtomicInteger j = new AtomicInteger();
                    //遍历idMap的id值
                    idMap.keySet().stream().forEach(item ->{
                        i.getAndIncrement();
                        if(RedisUtil.existAny(item.split(":")[1])){
                            j.getAndIncrement();
                            //mappingID存放映射id
                            mappingId.put(RedisUtil.get(item.split(":")[1]).toString(),idMap.get(item));
                        }
                    });
                    //判断id是否都已经处理
                    if(i.get()==j.get() && mappingId.size()==1){
                        returnResult.put("superId",mappingId.keySet().stream().toArray()[0]);
                        KafkaUtil.sendTopicMessage("deepuser",JSON.toJSONString(returnResult));
                        return;
                    }
                }catch (Exception e){
                }
                if(mappingId.size()!=0){
                    AtomicReference<String> realIdMapping = new AtomicReference<>("");
                    AtomicReference<Integer> i = new AtomicReference<>(0);
                    //新的id映射生成及修改
                    if (mappingId.keySet().stream().distinct().count()!=1){
                        mappingId.keySet().forEach(item ->{
                            if(0 == i.get()){
                                i.set(mappingId.get(item));
                                realIdMapping.set(item);
                            }
                            if(i.get()>mappingId.get(item)){
                                i.set(mappingId.get(item));
                                realIdMapping.set(item);
                            }
                        });
                    }else {
                        for (String j: mappingId.keySet()){
                            realIdMapping.set(j);
                        }
                    }
                    //判断idMap中是否有root，但是没有映射id
                    idMap.keySet().forEach(items->{
                        HashMap mapp = hashMap.get(items.split(":")[0]);
                        if("1".equals(mapp.get("isSingle").toString())){
                            if(!RedisUtil.existAny(items.split(":")[1])){
                                //将重新生成IdMapping
                                realIdMapping.updateAndGet(new UnaryOperator<String>() {
                                    @Override
                                    public String apply(String s) {
                                        return items.split(":")[1];
                                    }
                                });
                            }
                        };
                    });
                    idMap.keySet().forEach(items ->{
                        //IDMapping输出
                        this.checkMappingId(items.split(":")[1],realIdMapping.get());
                        RedisUtil.set(items.split(":")[1],realIdMapping.get());
                    });
                    result.put("superId",realIdMapping.get());
                }else {
                    //需要生成IDMapping
                    AtomicReference<String> realIdMapping = new AtomicReference<>("");
                    Iterator<Map.Entry<String,HashMap>> its =  hashMap.entrySet().iterator();
                    while(its.hasNext()){
                        Map.Entry<String,HashMap> entry=its.next();
                        if(result.containsKey(entry.getKey())){
                            if (realIdMapping.get()==""){
                                realIdMapping.set(result.get(entry.getKey()).toString());
                            }
                            log.info("IDMAPPING...ing-->{}:{}",result.get(entry.getKey()).toString(),realIdMapping.get());
                            //IDMapping输出
                            this.checkMappingId(result.get(entry.getKey()).toString(),realIdMapping.get());
                            RedisUtil.set(result.get(entry.getKey()).toString(),realIdMapping.get());
                        }
                    }
                    returnResult.put("superId",realIdMapping.get());
                }
                KafkaUtil.sendTopicMessage("deepuser",JSON.toJSONString(returnResult));
            }
        }
    }

    public void checkMappingId(String id,String value){
        //他的主要职责就是判断ID有无更新，id全局变更
        if(RedisUtil.existAny(id)){
            if(!RedisUtil.get(id).toString().equals(value)){
                //id全局变更逻辑
                //.........
                //.........
            }
        }
    }

}
